package com.k2data.platform.backup.consumer;

import com.k2data.common.EnvConf;
import com.k2data.platform.backup.common.Constants;
import com.k2data.platform.backup.common.ParamNames;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * Created by yn on 3/16/17.
 */
public class MetaDataConsumer {
    private final static Logger LOG = LoggerFactory.getLogger(MetaDataConsumer.class);

    private List<String> brokers;
    private int port;

    public MetaDataConsumer(List<String> brokers, int port) {
        this.brokers = brokers;
        this.port = port;
    }

    public MetaDataConsumer(List<String> brokers) {
        this(brokers, Integer.valueOf(EnvConf.getEvnValue(ParamNames.KAFKA_BROKER_PORT, String.valueOf(Constants.KAFKA_BROKER_PORT))));
    }

    public int getPartitionNum(String topic) {
        int result = 0;
        for (String broker: this.brokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(broker, this.port, 100000, 64 * 1024, "partitionNumLookup");
                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                List<PartitionMetadata> partitionMetadatas = metaData.get(0).partitionsMetadata();
                result = partitionMetadatas.size();
                break;
            } catch (Exception e) {
                LOG.warn("Error communicating with Broker [" + broker + "] to find partitionNum for [" + topic + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }

        return result;
    }

    public static void main(String[] args) {
        MetaDataConsumer consumer = new MetaDataConsumer(Arrays.asList(new String[] {"localhost"}));
        System.out.println(consumer.getPartitionNum("yntest"));
    }
}
